 1.大数据高速计算引擎Spark(上)之Spark SQL中Spark SQL编程下的Action操作
   
   与RDD类似的操作
   show、collect、collectAsList、head、first、count、take、takeAsList、reduce
   与结构相关
   printSchema、explain、columns、dtypes、col
   
EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20
7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30
7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30
7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20
7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30
7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30
7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10
7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20
7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10
7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30
7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20
7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30
7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20
7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10


import spark.implicits._
val df1 : DataFrame = spark.read.
  option("header", "true").
  option("inferschema", "true").
  csv("data/emp.dat")
df1.printSchema()

df1.count

// 缺省显示20行
df1.union(df1).show()
// 显示2行
df1.show(2)
// 不截断字符
df1.toJSON.show(false)
// 显示10行，不截断字符
df1.toJSON.show(10, false)
spark.catalog.listFunctions.show(10000, false)

// collect返回的是数组, Array[org.apache.spark.sql.Row]
val c1 = df1.collect()

// collectAsList返回的是List, List[org.apache.spark.sql.Row]
val c2 = df1.collectAsList()

// 返回 org.apache.spark.sql.Row
val h1 = df1.head()
val f1 = df1.first()

// 返回 Array[org.apache.spark.sql.Row]，长度为3
val h2 = df1.head(3)
val f2 = df1.take(3)

// 返回 List[org.apache.spark.sql.Row]，长度为2
val t2 = df1.takeAsList(2)

// 结构属性
df1.columns      // 查看列名
df1.dtypes       // 查看列名和类型
df1.explain()    // 参看执行计划
df1.col("name")  // 获取某个列
df1.printSchema  // 常用
 
 2.Transformation 操作
   
   select * from tab where ... group by ... having... order by...
   
   RDD类似的操作
   持久化/缓存与checkpoint
   select
where
group by / 聚合
order by
join
集合操作
空值操作(函数)
函数

   与RDD类似的操作
   map、filter、flatMap、mapPartitions、sample、 randomSplit、 limit、distinct
、dropDuplicates、describe
   
df1.map(row=>row.getAs[Int](0)).show

// randomSplit(与RDD类似，将DF、DS按给定参数分成多份)
val df2  = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count

// 取10行数据生成新的DataSet
val df2 = df1.limit(10)

// distinct，去重
val df3 = df1.union(df1)
df3.distinct.count

// dropDuplicates，按列值去重
df2.dropDuplicates.show
df2.dropDuplicates("mgr", "deptno").show
df2.dropDuplicates("mgr").show
df2.dropDuplicates("deptno").show

// 返回全部列的统计（count、mean、stddev、min、max）
df1.describe().show

// 返回指定列的统计
df1.describe("sal").show
df1.describe("sal", "comm").show

   存储相关
   cacheTable、persist、checkpoint、unpersist、cache
   备注：Dataset 默认的存储级别是 MEMORY_AND_DISK
   
import org.apache.spark.storage.StorageLevel
spark.sparkContext.setCheckpointDir("hdfs://linux121:9000/checkpoint")

df1.show()
df1.checkpoint()
df1.cache()
df1.persist(StorageLevel.MEMORY_ONLY)
df1.count()
df1.unpersist(true)

df1.createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")
spark.catalog.uncacheTable("t1")
    
   select相关
   列的多种表示、select、selectExpr
   drop、withColumn、withColumnRenamed、cast(内置函数)
// 列的多种表示方法。使用""、$""、'、col()、ds("")
// 注意：不要混用；必要时使用spark.implicitis._；并非每个表示在所有的
// 地方都有效
df1.select($"ename", $"hiredate", $"sal").show
df1.select("ename", "hiredate", "sal").show
df1.select('ename, 'hiredate, 'sal).show
df1.select(col("ename"), col("hiredate"), col("sal")).show
df1.select(df1("ename"), df1("hiredate"), df1("sal")).show

// 下面的写法无效，其他列的表示法有效
df1.select("ename", "hiredate", "sal"+100).show
df1.select("ename", "hiredate", "sal+100").show
// 这样写才符合语法
df1.select($"ename", $"hiredate", $"sal"+100).show
df1.select('ename, 'hiredate, 'sal+100).show

// 可使用expr表达式(expr里面只能使用引号)
df1.select(expr("comm+100"), expr("sal+100"),expr("ename")).show
df1.selectExpr("ename as name").show
df1.selectExpr("power(sal, 2)", "sal").show
df1.selectExpr("round(sal, -3) as newsal", "sal", "ename").show

// drop、withColumn、 withColumnRenamed、casting
// drop 删除一个或多个列，得到新的DF
df1.drop("mgr")
df1.drop("empno", "mgr").show

// withColumn，修改列值
val df2 = df1.withColumn("sal", $"sal"+1000)
df2.show

// withColumnRenamed，更改列名
df1.withColumnRenamed("sal", "newsal")
// 备注：drop、withColumn、withColumnRenamed返回的是DF
// cast，类型转换

df1.selectExpr("cast(empno as string)").printSchema
import org.apache.spark.sql.types._
df1.select('empno.cast(StringType)).printSchema
   
   where相关
   where == filter

// where操作
df1.where("sal>1000").show
df1.where("sal>1000 and job=='MANAGER'").show

// 操作
df1.filter("sal>1000").show
df1.filter("sal>1000 and job=='MANAGER'").show
   
   groupBy相关
   groupBy、agg、max、min、avg、sum、count(后面5个为内置函数)

// groupBy、max、min、mean、sum、count（与df1.count不同）
df1.groupBy("Job").sum("sal").show
df1.groupBy("Job").max("sal").show
df1.groupBy("Job").min("sal").show
df1.groupBy("Job").avg("sal").show
df1.groupBy("Job").count.show

// 类似having子句
df1.groupBy("Job").avg("sal").where("avg(sal) > 2000").show
df1.groupBy("Job").avg("sal").where($"avg(sal)" > 2000).show

// agg
df1.groupBy("Job").agg("sal"->"max", "sal"->"min",
 "sal"->"avg", "sal"->"sum", "sal"->"count").show
df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", 
 "sal"->"avg", "sal"->"sum", "sal"->"count").show

// 这种方式更好理解
df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"),
sum("sal"), count("sal")).show
// 给列取别名
df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"),
sum("sal"), count("sal")).withColumnRenamed("min(sal)", "min1").show
// 给列取别名，最简便
df1.groupBy("Job").agg(max("sal").as("max1"),
min("sal").as("min2"), avg("sal").as("avg3"),
sum("sal").as("sum4"), count("sal").as("count5")).show

   orderBy相关
   orderBy == sort

// orderBy
df1.orderBy("sal").show
df1.orderBy($"sal").show
df1.orderBy($"sal".asc).show
// 降序
df1.orderBy(-$"sal").show
df1.orderBy('sal).show
df1.orderBy(col("sal")).show
df1.orderBy(df1("sal")).show

df1.orderBy($"sal".desc).show
df1.orderBy(-'sal).show
df1.orderBy(-'deptno, -'sal).show

// sort，以下语句等价
df1.sort("sal").show
df1.sort($"sal").show
df1.sort($"sal".asc).show
df1.sort('sal).show
df1.sort(col("sal")).show
df1.sort(df1("sal")).show

df1.sort($"sal".desc).show
df1.sort(-'sal).show
df1.sort(-'deptno, -'sal).show

   join相关

// 1、笛卡尔积
df1.crossJoin(df1).count

// 2、等值连接（单字段）（连接字段empno，仅显示了一次）
df1.join(df1, "empno").count

// 3、等值连接（多字段）（连接字段empno、ename，仅显示了一次）
df1.join(df1, Seq("empno", "ename")).show

// 定义第一个数据集
case class StudentAge(sno: Int, name: String, age: Int)
val lst = List(StudentAge(1,"Alice", 18), StudentAge(2,"Andy", 19),
StudentAge(3,"Bob", 17), StudentAge(4,"Justin", 21), StudentAge(5,"Cindy", 20))
val ds1 = spark.createDataset(lst)
ds1.show()

// 定义第二个数据集
case class StudentHeight(sname: String, height: Int)
val rdd = sc.makeRDD(List(StudentHeight("Alice", 160), StudentHeight("Andy", 159), 
StudentHeight("Bob", 170), StudentHeight("Cindy", 165), StudentHeight("Rose", 160)))
val ds2 = rdd.toDS

// 备注：不能使用双引号，而且这里是 ===
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, 'name==='sname).show
ds1.join(ds2, ds1("name")===ds2("sname")).show
ds1.join(ds2, ds1("sname")===ds2("sname"), "inner").show
ds1.join(ds2, 'name==='sname, "inner").show

// 多种连接方式
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, $"name"===$"sname", "inner").show

ds1.join(ds2, $"name"===$"sname", "left").show
ds1.join(ds2, $"name"===$"sname", "left_outer").show

ds1.join(ds2, $"name"===$"sname", "right").show
ds1.join(ds2, $"name"===$"sname", "right_outer").show

ds1.join(ds2, $"name"===$"sname", "outer").show
ds1.join(ds2, $"name"===$"sname", "full").show
ds1.join(ds2, $"name"===$"sname", "full_outer").show

备注：DS在join操作之后变成了DF

   集合相关
   union==unionAll（过期）、intersect、except
   
// union、unionAll、intersect、except。集合的交、并、差
val ds3 = ds1.select("name")
val ds4 = ds2.select("sname")

// union 求并集，不去重
ds3.union(ds4).show

// unionAll、union 等价；unionAll过期方法，不建议使用
ds3.unionAll(ds4).show

// intersect 求交
ds3.intersect(ds4).show

// except 求差
ds3.except(ds4).show

   空值处理
   na.fill、na.drop
   
// NaN (Not a Number)
math.sqrt(-1.0)
math.sqrt(-1.0).isNaN()

df1.show
// 删除所有列的空值和NaN
df1.na.drop.show

// 删除某列的空值和NaN
df1.na.drop(Array("mgr")).show

// 对全部列填充；对指定单列填充；对指定多列填充
df1.na.fill(1000).show
df1.na.fill(1000, Array("comm")).show
df1.na.fill(Map("mgr"->2000, "comm"->1000)).show

// 对指定的值进行替换
df1.na.replace("comm" :: "deptno" :: Nil, Map(0 -> 100, 10 -> 100)).show

// 查询空值列或非空值列。isNull、isNotNull为内置函数
df1.filter("comm is null").show
df1.filter($"comm".isNull).show
df1.filter(col("comm").isNull).show

df1.filter("comm is not null").show
df1.filter(col("comm").isNotNull).show

   窗口函数
   一般情况下窗口函数不用 DSL 处理，直接用SQL更方便
   参考源码Window.scala、WindowSpec.scala(主要)

import org.apache.spark.sql.expressions.Window
val w1 = Window.partitionBy("cookieid").orderBy("createtime")
val w2 = Window.partitionBy("cookieid").orderBy("pv")
val w3 = w1.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val w4 = w1.rowsBetween(-1, 1)

// 聚组函数【用分析函数的数据集】
df.select($"cookieid", $"pv", sum("pv").over(w1).alias("pv1")).show
df.select($"cookieid", $"pv", sum("pv").over(w3).alias("pv1")).show
df.select($"cookieid", $"pv", sum("pv").over(w4).as("pv1")).show

// 排名
df.select($"cookieid", $"pv", rank().over(w2).alias("rank")).show
df.select($"cookieid", $"pv", dense_rank().over(w2).alias("denserank")).show
df.select($"cookieid", $"pv", row_number().over(w2).alias("rownumber")).show

// lag、lead
df.select($"cookieid", $"pv", lag("pv", 2).over(w2).alias("rownumber")).show
df.select($"cookieid", $"pv", lag("pv", -2).over(w2).alias("rownumber")).show

   内建函数
   http://spark.apache.org/docs/latest/api/sql/index.html
   